package io.github.quickmsg.core.mqtt;

import io.github.quickmsg.common.channel.MqttChannel;
import io.github.quickmsg.common.message.SmqttMessage;
import io.github.quickmsg.common.transport.Transport;
import io.github.quickmsg.core.cluster.ClusterReceiver;
import io.netty.handler.codec.mqtt.MqttMessage;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

/**
 * @author luxurong
 */
@Getter
@Slf4j
public class MqttReceiveContext extends AbstractReceiveContext<MqttConfiguration> {

    // 集群接收者
    private final ClusterReceiver clusterReceiver;

    public MqttReceiveContext(MqttConfiguration configuration, Transport<MqttConfiguration> transport) {
        super(configuration, transport);
        this.clusterReceiver = new ClusterReceiver(this);
        // 集群注册
        clusterReceiver.registry();
    }

    public void apply(MqttChannel mqttChannel) {
        mqttChannel
                // 注册销毁事件
                .registryDelayTcpClose()
                // 获取连接
                .getConnection()
                // 事件传入ChannelPipeline，则为inbound;  事件传出ChannelPipeline，则为outbound;
                .inbound()
                // 接收数据
                .receiveObject()
                // 将接收的数据类型转换
                .cast(MqttMessage.class)
                // 如果发生错误，日志记录
                .onErrorContinue((throwable, o) -> {
                    log.error("on message error {}",o,throwable);
                })
                // 进行过滤，只需要能够正常解码的MQTT消息
                .filter(mqttMessage -> mqttMessage.decoderResult().isSuccess())
                // 订阅消息的处理流
                .subscribe(mqttMessage -> this.accept(mqttChannel, new SmqttMessage<>(mqttMessage,System.currentTimeMillis(),Boolean.FALSE)));

    }


    @Override
    public void accept(MqttChannel mqttChannel, SmqttMessage<MqttMessage> mqttMessage) {
        this.getProtocolAdaptor().chooseProtocol(mqttChannel, mqttMessage, this);
    }


}
